package com.yootk.rockemq;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

import java.util.List;

public class MessageLogbackConsumer { // 消息消费者
    // 如果此时你使用的是集群服务，则每个主机之间使用“,”分割
    public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
    public static final String CONSUMER_GROUP = "muyan-group-logback"; // 定义消费组
    public static final String TOPIC = "TopicLogback"; // 定义主题名称
    public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
    public static final String SECRET_KEY = "helloyootk"; // 定义密码
    public static void main(String[] args) throws Exception { // 懒人必备的处理形式
        // 1、由于此时的RocketMQ启动了ACL安全认证的保护机制，所以需要配置相应的回调
        RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
        // 2、要在项目之中启动一个消费者的处理程序类，但是这个消费者分为两种形式
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP, clientHook, new AllocateMessageQueueAveragely());
        consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
        consumer.subscribe(TOPIC, "*"); // 匹配指定主题的所有消息
        // 3、进行消息监听的处理操作，在监听的时候要使用专属的监听接口
        consumer.registerMessageListener(new MessageListenerConcurrently() { // 准备接收消息
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                msgs.forEach((msg)->{
                    System.out.printf("【%s】接收到新的消息：body = %s、level = %s %n", Thread.currentThread().getName(), new String(msg.getBody()), msg.getProperty("level"));
                });
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消息消费成功
            }
        });
        consumer.start(); // 启动消息的消费者
    }
}
